import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';
import '../utils.dart';

class ExhaustMapTestPage extends TestPage {
  ExhaustMapTestPage(super.title) {
    group('ExhaustMap', () {
      test('does not create a new Stream while emitting', () async {
        var calls = 0;
        final stream = Rx.range(0, 9).exhaustMap((i) {
          calls++;
          return Rx.timer(i, Duration(milliseconds: 100));
        });

        expect(stream);
        expect(calls);
      });

      test('starts emitting again after previous Stream is complete', () async {
        final stream = Stream.fromIterable(const [0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
            .interval(Duration(milliseconds: 30))
            .exhaustMap((i) async* {
          yield Future.delayed(Duration(milliseconds: 70), () => i);
        });

        expect(stream);
      });

      test('is reusable', () async {
        final transformer = ExhaustMapStreamTransformer(
                (int i) => Rx.timer(i, Duration(milliseconds: 100)));

        expect(
          Rx.range(0, 9).transform(transformer),
        );

        expect(
          Rx.range(0, 9).transform(transformer),
        );
      });

      test('works as a broadcast stream', () async {
        final stream = Rx.range(0, 9)
            .asBroadcastStream()
            .exhaustMap((i) => Rx.timer(i, Duration(milliseconds: 100)));

        expect(() {
          stream.listen(null);
          stream.listen(null);
        });
      });

      test('should emit errors from source', () async {
        final streamWithError = Stream<int>.error(Exception())
            .exhaustMap((i) => Rx.timer(i, Duration(milliseconds: 100)));

        expect(streamWithError);
      });

      test('should emit errors from mapped stream', () async {
        final streamWithError = Stream.value(1).exhaustMap(
                (_) => Stream<void>.error(Exception('Catch me if you can!')));

        expect(streamWithError);
      });

      test('should emit errors thrown in the mapper', () async {
        final streamWithError = Stream.value(1).exhaustMap<void>((_) {
          throw Exception('oh noes!');
        });

        expect(streamWithError);
      });

      test('can be paused and resumed', () async {
        late StreamSubscription<int> subscription;
        final stream = Rx.range(0, 9)
            .exhaustMap((i) => Rx.timer(i, Duration(milliseconds: 20)));

        subscription = stream.listen(((value) {
          expect(value);
          subscription.cancel();
        }));

        subscription.pause();
        subscription.resume();
      });

      test('Rx.exhaustMap accidental broadcast', () async {
        final controller = StreamController<int>();

        final stream = controller.stream.exhaustMap((_) => Stream<int>.empty());

        stream.listen(null);
        expect(() => stream.listen(null));

        controller.add(1);
      });

      test('Rx.exhaustMap.nullable', () {
        nullableTest<String?>(
              (s) => s.exhaustMap((v) => Stream.value(v)),
        );
      });
    });
  }

}